home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- from __future__ import with_statement
- import os
- import codecs
- from threading import RLock as Lock
- from time import time
- from path import path
- import Queue as Q
-
- def tail(filename, maxbytes, encoding = None):
- if maxbytes <= 0:
- raise ValueError('maxbytes must be more than 0')
-
- seekargs = (-maxbytes, os.SEEK_END)
- if hasattr(filename, 'read'):
- f = filename
- will_close = False
- else:
- filesize = os.stat(filename).st_size
- if encoding is not None:
- f = codecs.open(filename, 'rb', encoding)
- else:
- f = open(filename, 'rb')
- if maxbytes > filesize:
- seek = 0
- else:
- seek = filesize - maxbytes
- seekargs = (seek,)
- will_close = True
- f.seek(*seekargs)
- s = f.read()
- if will_close:
- f.close()
-
- return s
-
-
- def streamcopy(fobjin, fobjouts, limit = None, chunk = 4096):
- if hasattr(fobjouts, 'write'):
- fobjouts = [
- fobjouts]
-
-
- def writer(data):
- for fobjout in fobjouts:
- fobjout.write(data)
-
-
- return functioncopy(fobjin.read, writer, limit, chunk)
-
-
- def functioncopy(reader, writer, limit = None, chunk = 4096):
- if not callable(reader) or not callable(writer):
- raise TypeError("Both 'reader' and 'writer' must be callable. Got (%r, %r) instead.", reader, writer)
-
- written = 0
- if limit is not None:
- sz_to_read = min(limit, chunk)
- else:
- limit = -1
- sz_to_read = chunk
- bytes = reader(sz_to_read)
- while bytes:
- writer(bytes)
- limit -= len(bytes)
- written += len(bytes)
- if limit > 0:
- sz_to_read = min(limit, chunk)
- elif limit == 0:
- break
- else:
- sz_to_read = chunk
- bytes = reader(sz_to_read)
- return written
-
- CHUNKSIZE = 32768
-
- def trim_file(fname, cap, newsize):
- fobjin = None
- fobjout = None
- fname = path(fname)
- if fname.size > cap:
-
- try:
- fobjin = open(fname, 'rb')
- fobjout = open(fname + '.new', 'wb')
- fobjin.seek(-newsize, os.SEEK_END)
- streamcopy(fobjin, fobjout, CHUNKSIZE)
- finally:
- for f in (fobjin, fobjout):
- if f is not None:
- f.close()
- continue
-
-
- os.remove(fname)
- os.rename(fobjout.name, fname)
-
-
-
- class PausableStream(object):
-
- def __init__(self, stream):
- self._lock = Lock()
- self.paused = False
- self.stream = stream
- self._queue = Q.Queue()
-
-
- def pause(self):
- self.paused = True
-
-
- def unpause(self):
- if self._lock.acquire():
-
- try:
- while True:
-
- try:
- self.stream.write(self._queue.get_nowait())
- continue
- except Q.Empty:
- break
- continue
-
-
- None<EXCEPTION MATCH>Q.Empty
- finally:
- self._lock.release()
-
- self.paused = False
-
-
-
- def write(self, data):
- if self.paused:
- self._queue.put(data)
- elif self._lock.acquire(0):
-
- try:
- self.stream.write(data)
- finally:
- self._lock.release()
-
- else:
- self._queue.put(data)
- return len(data)
-
-
- def flush(self):
- if not self.paused:
- self.unpause()
-
- return self.stream.flush()
-
-
- def close(self):
- return self.stream.close()
-
-
- def tell(self):
- return self.stream.tell()
-
-
-
- class SwappableStream(PausableStream):
-
- def start_swap(self):
- self.pause()
- self.stream.flush()
- self.stream.close()
-
-
- def finish_swap(self, newstream):
- self.stream = newstream
- self.unpause()
-
-
-
- class LimitedFileSize(SwappableStream):
-
- def __init__(self, fname, filesize_limit, resize, initmode = 'wb'):
- fobj = open(fname, initmode)
- if resize > filesize_limit:
- raise ValueError('resize must be smaller than filesize_limit. (resize=%r, filesize_limit=%r)', resize, filesize_limit)
-
- SwappableStream.__init__(self, fobj)
- self._szlimit = filesize_limit
- self._fname = fname
- self._resize = resize
-
-
- def write(self, data):
- SwappableStream.write(self, data)
- self.flush()
- if os.path.getsize(self._fname) > self._szlimit:
- self.start_swap()
-
- try:
- trim_file(self._fname, self._szlimit, self._resize)
- finally:
- self.finish_swap(open(self._fname, 'ab'))
-
-
-
-
- from ratelimited import RateLimiter
-
- class StreamLimiter(RateLimiter):
-
- def __init__(self, stream, limit = 4096, window = 5):
- self.stream = stream
- RateLimiter.__init__(self, self.stream.write, limit, window)
-
-
- def write(self, data):
- self.handle_data(data)
-
-
- def flush(self):
- return self.stream.flush()
-
-
- def close(self):
- return self.stream.close()
-
-
- def tell(self):
- return self.stream.tell()
-
-
- def too_fast(self, data):
- s = self.stream
- s.write('Writing too fast: %r\n' % self.bps)
- s.flush()
-
-
-
- class DelayedStreamLimiter(StreamLimiter):
- DELAY = 0.25
-
- def __init__(self, *a, **k):
- StreamLimiter.__init__(self, *a, **k)
- self._process_stop_time = 0
-
-
- def handle_data(self, data):
- should_write = None
- if not StreamLimiter.handle_data(self, data):
- now = time()
- if self._process_stop_time == 0:
- if now - self._process_stop_time < self.DELAY:
- should_write = True
- else:
- self._process_stop_time = now
- should_write = False
- elif now - self._process_stop_time < self.DELAY:
- should_write = True
- else:
- should_write = False
- else:
- should_write = False
- self._process_stop_time = 0
- if should_write:
- self.f_process(data)
-
- if should_write:
- return True
- elif self._process_stop_time == 0:
- return True
- else:
- return False
-
-
-
- class DisablingStream(object):
-
- def __init__(self, target):
- self.target = target
- self.write = self.write_enabled
- self.flush = self.flush_enabled
-
-
- def write_enabled(self, s):
-
- try:
- self.target.write(s)
- except:
- self.disable()
-
-
-
- def flush_enabled(self):
-
- try:
- self.target.flush()
- except:
- self.disable()
-
-
-
- def disable(self):
- self.set_enabled(False)
-
-
- def enable(self):
- self.set_enabled(True)
-
-
- def disabled(self, data = None):
- pass
-
-
- def set_enabled(self, val):
- if val:
- self.flush = self.flush_enabled
- self.write = self.write_enabled
- else:
- self.flush = self.write = self.disabled
-
-
- if __name__ == '__main__':
- from primitives import getrandbytes
- data = getrandbytes(100)
- half_len = len(data) / 2
- from StringIO import StringIO
- in_ = StringIO(data)
- out = None
-
- def reset(i):
- i.seek(0)
- return StringIO()
-
-
- def check(i, o, l, w):
- if i.getvalue()[:l] == o.getvalue():
- pass
- return w == l
-
- __test_stream_copy = '>>> out = reset(in_); written = streamcopy(in_, out); check(in_, out, len(data), written)\nTrue\n>>> out = reset(in_); written = streamcopy(in_, out, chunk = len(data)); check(in_, out, len(data), written)\nTrue\n>>> out = reset(in_); written = streamcopy(in_, out, limit = half_len); check(in_, out, half_len, written)\nTrue\n>>> out = reset(in_); written = streamcopy(in_, out, limit = half_len, chunk = half_len+1); check(in_, out, half_len, written)\nTrue\n>>> out = reset(in_); written = streamcopy(in_, out, limit = half_len, chunk = half_len-1); check(in_, out, half_len, written)\nTrue\n'
- __test_tail = '>>> in_.seek(0); tail(in_, 5) == in_.getvalue()[-5:]\nTrue\n>>> in_.seek(0); tail(in_, 1000) == in_.getvalue()\nTrue\n'
- __test__ = dict(streamcopy = __test_stream_copy, tail = __test_tail)
- import doctest
- doctest.testmod(verbose = True)
- import sys
- f = DelayedStreamLimiter(sys.stdout, limit = 8, window = 1)
- import time as time_mod
- for i in range(20):
- f.write(str(i) + '\n')
- time_mod.sleep(0.04 * i)
-
-
-